Apache Flink-এ মেশিন লার্নিং (ML) মডেল বাস্তবায়ন করার জন্য Flink-এর স্ট্রিম প্রসেসিং এবং ব্যাচ প্রসেসিং উভয় সুবিধা ব্যবহার করা যায়। Flink-এর ML লাইব্রেরি, TensorFlow বা অন্য কোনো লাইব্রেরি ইন্টিগ্রেট করে মডেল বাস্তবায়ন করা যায়। Flink সাধারণত স্ট্রিমিং ডেটার উপর মডেল ট্রেনিং এবং প্রেডিকশন উভয় কাজেই ব্যবহার করা হয়।
Flink সেটআপ এবং ডিপেন্ডেন্সি কনফিগারেশন:
ML মডেল লোড বা ট্রেনিং:
ডেটা সোর্স এবং ডেটা প্রসেসিং:
নিচের উদাহরণে, আমরা একটি প্রেডিকশন ML মডেল ব্যবহার করবো যা আগে থেকেই TensorFlow দিয়ে ট্রেন করা হয়েছে:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.api.common.functions.MapFunction;
import org.tensorflow.SavedModelBundle;
import org.tensorflow.Tensor;
public class FlinkMLExample {
public static void main(String[] args) throws Exception {
// Flink Execution Environment তৈরি করা
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// ডেটা সোর্স (উদাহরণ: সিম্পল ইন্টিজার স্ট্রিম)
DataStream<Integer> inputData = env.fromElements(1, 2, 3, 4, 5);
// TensorFlow মডেল লোড করা
SavedModelBundle model = SavedModelBundle.load("path/to/saved/model", "serve");
// Map Function ব্যবহার করে প্রতিটি ইনপুট ডেটার উপর প্রেডিকশন করা
SingleOutputStreamOperator<Float> predictions = inputData.map(new MapFunction<Integer, Float>() {
@Override
public Float map(Integer value) throws Exception {
// ইনপুট ডেটা টেন্সর হিসেবে রূপান্তর করা
Tensor<Integer> inputTensor = Tensor.create(new int[]{value});
// মডেল থেকে প্রেডিকশন নেওয়া
Tensor<Float> result = model.session().runner()
.feed("input_tensor_name", inputTensor)
.fetch("output_tensor_name")
.run().get(0)
.expect(Float.class);
// প্রেডিকশন রিটার্ন করা
float[] prediction = new float[1];
result.copyTo(prediction);
return prediction[0];
}
});
// আউটপুট দেখানো
predictions.print();
// কাজটি শুরু করা
env.execute("Flink TensorFlow Prediction Example");
}
}
env.fromElements(1, 2, 3, 4, 5)
একটি সিম্পল ইন্টিজার স্ট্রিম তৈরি করে।SavedModelBundle.load
মেথড ব্যবহার করে পূর্বে সংরক্ষিত TensorFlow মডেল লোড করা হয়েছে।TensorFlow Serving
বা TensorFlow Lite
ব্যবহার করে মডেল অপ্টিমাইজ করা যেতে পারে।Linear Regression
, KMeans
, ইত্যাদি সাপোর্ট করে।KMeans kMeans = new KMeans()
.setK(3)
.setMaxIterations(10);
DataSet<KMeansModel> model = kMeans.fit(trainingData);
এই পদ্ধতি ব্যবহার করে, আপনি Flink-এ স্ট্রিম বা ব্যাচ ডেটার উপর বিভিন্ন ধরনের মেশিন লার্নিং মডেল ট্রেন এবং প্রেডিকশন করতে পারবেন।
আরও দেখুন...